package com.fsh.lingsp.common.websocket.service.impl;

import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.json.JSONUtil;
import com.fsh.lingsp.common.common.event.UserOfflineEvent;
import com.fsh.lingsp.common.common.event.UserOnlineEvent;
import com.fsh.lingsp.common.common.thread.ThreadPoolConfig;
import com.fsh.lingsp.common.user.dao.UserDao;
import com.fsh.lingsp.common.user.domain.entity.User;
import com.fsh.lingsp.common.user.domain.enums.RoleEnum;
import com.fsh.lingsp.common.user.service.LoginService;
import com.fsh.lingsp.common.user.service.RoleService;
import com.fsh.lingsp.common.websocket.adapter.WSAdapter;
import com.fsh.lingsp.common.websocket.domain.dto.WSChannelExtraDTO;
import com.fsh.lingsp.common.websocket.domain.vo.response.WSBaseResp;
import com.fsh.lingsp.common.websocket.service.WebSocketService;
import com.fsh.lingsp.common.websocket.util.NettyUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.bean.result.WxMpQrCodeTicket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * 作者：fsh
 * 日期：2024/02/24
 * <p>
 * 描述：Web 套接字服务 impl
 *
 * 专门管理websocket的逻辑，包括推拉。
 */
@Service
@Slf4j
public class WebSocketServiceImpl implements WebSocketService {

    @Autowired
    @Lazy
    private WxMpService wxMpService;
    @Autowired
    private UserDao userDao;
    @Autowired
    private LoginService loginService;
    @Autowired
    private RoleService roleService;
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;
    @Autowired
    @Qualifier(value = ThreadPoolConfig.WS_EXECUTOR)
    private ThreadPoolTaskExecutor executor;


    //保存所有连接
    private final static ConcurrentHashMap<Channel, WSChannelExtraDTO> ONLINE_WS_MAP=new ConcurrentHashMap<>();
    /**
     * 临时保存 登录code和channel的映射关系。
     * 最大为1000空间，过期时间为1小时
     */
    private static final Cache<Integer,Channel> WAIT_LOGIN_MAP= Caffeine.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(Duration.ofHours(1))
            .build();


    public static ConcurrentHashMap<Channel, WSChannelExtraDTO> getOnlineMap() {
        return ONLINE_WS_MAP;
    }

    /**
     * 处理所有ws连接的事件。 管理所有用户的连接，包括登录态和游客
     *
     * @param channel 通道
     */
    @Override
    public void connect(Channel channel) {
        //将连接通道 放到map中
        ONLINE_WS_MAP.put(channel,new WSChannelExtraDTO());
    }

    /**
     * 所有在线的用户和对应的socket
     */
    private static final ConcurrentHashMap<Long, CopyOnWriteArrayList<Channel>> ONLINE_UID_MAP = new ConcurrentHashMap<>();

    /**
     * 用户上线
     */
    private void online(Channel channel, Long uid) {
        getOrInitChannelExt(channel).setUid(uid);
        ONLINE_UID_MAP.putIfAbsent(uid, new CopyOnWriteArrayList<>());
        ONLINE_UID_MAP.get(uid).add(channel);
        NettyUtil.setAttr(channel, NettyUtil.UID, uid);
    }

    /**
     * 用户下线
     * return 是否全下线成功
     */
    private boolean offline(Channel channel, Optional<Long> uidOptional) {
        ONLINE_WS_MAP.remove(channel);
        if (uidOptional.isPresent()) {
            CopyOnWriteArrayList<Channel> channels = ONLINE_UID_MAP.get(uidOptional.get());
            if (CollectionUtil.isNotEmpty(channels)) {
                channels.removeIf(ch -> Objects.equals(ch, channel));
            }
            return CollectionUtil.isEmpty(ONLINE_UID_MAP.get(uidOptional.get()));
        }
        return true;
    }

    //如果在线列表不存在，就先把该channel放进在线列表
    private WSChannelExtraDTO getOrInitChannelExt(Channel channel) {
        WSChannelExtraDTO wsChannelExtraDTO =
                ONLINE_WS_MAP.getOrDefault(channel, new WSChannelExtraDTO());
        WSChannelExtraDTO old = ONLINE_WS_MAP.putIfAbsent(channel, wsChannelExtraDTO);
        return ObjectUtil.isNull(old) ? wsChannelExtraDTO : old;
    }



    /**
     * 处理用户登录请求，需要返回一张带code的二维码
     *
     * @param channel 通道
     */
    @Override
    @SneakyThrows
    public void handleLoginReq(Channel channel) {
        //调用方法，获取随机不重复的登录码
        Integer code = generateLoginCode(channel);
        //请求微信接口，获取登录码地址 (二维码过期时间为1h)，并把code放在里面(这个code就是后续的eventKey)
        WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, (int)Duration.ofHours(1).getSeconds());
        //把码返回给前端，wxMpQrCodeTicket里面包含了二维码的url。
        sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket));
    }

    /**
     * 返回给前端消息
     */
    private void sendMsg(Channel channel, WSBaseResp<?> buildLoginResp) {
        channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(buildLoginResp)));
    }

    /**
     * 生成随机不重复的登录码
     */
    private Integer generateLoginCode(Channel channel) {
        Integer code=null;
        do{
            //1、随机生成一个码
            code= RandomUtil.randomInt(Integer.MAX_VALUE);
            //2、放进map，key不重复才能放成功。key重复返回key(非null)，不重复才能放进。
            //   所以只要nonNull，就是重复了，重新生成一个key，直到不重复。
        }while(Objects.nonNull(WAIT_LOGIN_MAP.asMap().putIfAbsent(code,channel)));
        //3、返回code
        log.info("WebSocketServiceImpl.generateLoginCode()===>code={}",code);
        return code;
    }


    /**
     * 用户下线统一处理
     *
     * @param channel
     */
    @Override
    public void removed(Channel channel) {
        WSChannelExtraDTO wsChannelExtraDTO = ONLINE_WS_MAP.get(channel);
        //用户下线后的额外逻辑
        Optional<Long> uidOptional = Optional.ofNullable(wsChannelExtraDTO)
                .map(WSChannelExtraDTO::getUid);
        boolean offlineAll = offline(channel, uidOptional);
        if (uidOptional.isPresent() && offlineAll) {//已登录用户断连,并且全下线成功
            User user = new User();
            user.setId(uidOptional.get());
            user.setLastOptTime(new Date());
            applicationEventPublisher.publishEvent(new UserOfflineEvent(this, user));
        }
    }

    /**
     * 登陆成功的逻辑，通过code找到给channel推送消息
     * @param code
     * @param id
     */
    @Override
    public void scanLoginSuccess(Integer code, Long id) {
        //确认连接在机器上
        Channel channel = WAIT_LOGIN_MAP.getIfPresent(code);
        if (Objects.isNull(channel)){
            return;
        }
        User user=userDao.getById(id);
        //移除code
        WAIT_LOGIN_MAP.invalidate(code);
        //调用登录模块获取token
        String token=loginService.login(id);
        //用户登录成功，建立 channel和uid的映射，并返回给前端消息
        loginSuccess(channel,user,token);
    }

    //登陆成功，建立 uid和channel的映射关系,并返回给前端消息。即登陆成功后的逻辑
    private void loginSuccess(Channel channel, User user, String token) {
        //更新上线列表
        online(channel, user.getId());
        //建立 uid和channel的映射关系
        WSChannelExtraDTO wsChannelExtraDTO = ONLINE_WS_MAP.get(channel);
        wsChannelExtraDTO.setUid(user.getId());
        //返回给前端消息。
        // 返回该用户的 角色。这里直接判断了该用户是否有聊天室管理员这个角色权限。有就返回true，没有就返回false(普通群员)
        sendMsg(channel, WSAdapter.buildUserResp(user,token,roleService.hasPower(user.getId(), RoleEnum.CHAT_MANGER)));

        //用户上线成功的事件。1.做ip归属地功能
        //更新用户 最近一次登录时间 和 ip信息
        user.setLastOptTime(new Date());
        // 用到了充血模型。 在实体类中写方法。
        user.refreshIp(NettyUtil.getAttr(channel,NettyUtil.IP));
        //注册用户上线的事件（ip归属地解析的时间点）
        applicationEventPublisher.publishEvent(new UserOnlineEvent(this,user));
    }

    /**
     * 等待授权，正在授权中
     *
     * @param code
     */
    @Override
    public void waitAuthorize(Integer code) {
        Channel channel = WAIT_LOGIN_MAP.getIfPresent(code);
        if (Objects.isNull(channel)){
            return;
        }
        sendMsg(channel,WSAdapter.buildWaitAuthorize());
    }

    /**
     * 前提是：用户已经在网站 扫码登录注册授权过了，前端已经存了之前的token。
     * 登录认证：前端建立ws连接，传过来保存的token，拿到用户的信息
     *
     * @param channel
     * @param token
     */
    @Override
    public void authorize(Channel channel, String token) {
        Long validUid = loginService.getValidUid(token);
        //能拿到uid说明没过期
        if (Objects.nonNull(validUid)){
            //返回前端用户信息
            User user = userDao.getById(validUid);
            //登陆成功，建立 uid和channel的映射关系。即登陆成功后的逻辑
            loginSuccess(channel,user,token);
        }else{
            // token 过期，给前端发送ws消息，提示token过期，清除前端存储的token，重新扫码登录
            sendMsg(channel,WSAdapter.buildInvalidTokenResp());
        }
    }

    /**
     * 推送消息给全部在线人员
     *      使用自定义的线程池
     * @param resp
     */
    @Override
    public void sendMsgToAll(WSBaseResp<?> resp) {
        ONLINE_WS_MAP.forEach((channel, wsChannelExtraDTO) -> executor.execute(()->{
            sendMsg(channel,resp);
        }));
    }

    /**
     * 针对 uid 推送到  他的 channel 中
     */
    @Override
    public void sendToUid(WSBaseResp<?> wsBaseResp, Long uid) {
        //虽然这是list集合，但是一般实际上只有一个channel。 多端登录就会有两个或者多个channel，
        CopyOnWriteArrayList<Channel> channels = ONLINE_UID_MAP.get(uid);
        if (CollectionUtil.isEmpty(channels)) {
            log.info("用户：{}不在线", uid);
            return;
        }
        channels.forEach(channel -> {
            executor.execute(() -> sendMsg(channel, wsBaseResp));
        });
    }

    /**
     * 直接  给每个 连接channel  都推送消息
     *///entrySet的值不是快照数据,但是它支持遍历，所以无所谓了，不用快照也行。
    @Override
    public void sendToAllOnline(WSBaseResp<?> wsBaseResp, Long skipUid) {
        ONLINE_WS_MAP.forEach((channel, ext) -> {
            if (Objects.nonNull(skipUid) && Objects.equals(ext.getUid(), skipUid)) {
                return;
            }
            executor.execute(() -> sendMsg(channel, wsBaseResp));
        });
    }
}
